/**
* Copyright © 2018-2019, by 晓叹星沉.
*/
package org.aurora.mq.core;

import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * <p>
 * push consumer基类
 * </p>
 * @author 晓叹星沉
 * @since jdk1.8
 * 2019年7月10日
 *  
 */

public abstract class AbstractRocketMQPushConsumer {
	
	private static Logger logger = LoggerFactory.getLogger(AbstractRocketMQPushConsumer.class);

	/**
     * 继承这个方法处理消息
     * @see MessageExtConst
     * @param message 消息范型
     * @param extMap 存放消息附加属性的map, map中的key存放在 @link MessageExtConst 中
     * @return 处理结果
     */
    public abstract boolean process(MessageExt msg);

    /**
     * 原生dealMessage方法，可以重写此方法自定义序列化和返回消费成功的相关逻辑
     *
     * @param list 消息列表
     * @param consumeConcurrentlyContext 上下文
     * @return 消费状态
     */
    public ConsumeConcurrentlyStatus dealMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for(MessageExt messageExt : list) {
        	logger.info("receive msgId: {}, topic : {}, tags : {}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getTags());
            if( null != messageExt && !process(messageExt)) {
            	logger.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId());
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    /**
     * 原生dealMessage方法，可以重写此方法自定义序列化和返回消费成功的相关逻辑
     *
     * @param list 消息列表
     * @param consumeOrderlyContext 上下文
     * @return 处理结果
     */
    public ConsumeOrderlyStatus dealMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for(MessageExt messageExt : list) {
        	logger.info("receive msgId: {}, topic : {}, tags : {}" , messageExt.getMsgId(), messageExt.getTopic(), messageExt.getTags());
        	if( null != messageExt && !process(messageExt)) {
            	logger.warn("consume fail , ask for re-consume , msgId: {}", messageExt.getMsgId());
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return  ConsumeOrderlyStatus.SUCCESS;
    }
}
